tf读取文件
参考:http://xiangruix.com/2018/05/20/tensorflow-inputdata/
读取本地文件
placeholder读取
实质:占位符的读取实际上是定义了个固定格式的结构(参数),并基于这个占位符定义读取方式。当读取具体数据时,则将真实数据传到占位符里面进行读取
queue_runner读取
参考:https://zhuanlan.zhihu.com/p/27238630
实质:
# 将文件打包成quene类型,tf将从quene读取目录、文件【但是文件读取是无序的】
filename_queue = tf.train.string_input_producer(filename)
# 创建reader,目的是适应文件的不同数据结构,通过reader.read按照相应的结构读取文件数据,reader返回一个tensor
reader = tf.TextLinearReader()
key, value = reader.read(filename_quene)
dataset读取
参考:https://zhuanlan.zhihu.com/p/33223782;https://zhuanlan.zhihu.com/p/33223782
dataset和iterator的关系
- 在tensorflow中,新版本提倡用dataset处理tfrecord数据,而不提倡使用TFRecordReader读取
- dataset实质是个水池,用来装载数据,但是无法直接展示数据,而数据是水池中的水滴,其展示是通过水龙头(iterator)的控制来决定展示数据的多少
将数据输入到内存中,然后将其转化为dataset
- 实质:按行读入文件后,将每行数据按照一个或者多个tensor存入到dataset中
- 适用:小样本数据量
测试代码
ids = [] features = [] with open(filename) as f: for line in f: l = line.strip().split('\t') ids.append(list(map(lambda x: int(x.strip()), l[0].strip().split('-')) features.append(list(map(lambda x: x.strip(), l[1].strip().split(',')) data = tf.data.Dataset.from_tensor_slics((ids, features)) # 可查看存入数据的数据类型 print dataset.output_types print dataset.output_shapes
通过protobuffer序列化存储数据为tfrecord格式,直接读取其为dataset数据
- 实质:是以字典的方式一次写入一个样本,实质是一个二进制文件
- 优点:在设置字段的数据类型后,后续可以指定特征是以标签或者连续特征的形式处理。即一般处理中单数的特征也按照标量处理,但是通过tfrecord存储的特征可以不是标量(向量or矩阵or任何维度的张量)
- 测试代码
# 生成tfrecord文件,要求输入必须为向量格式 tf.train.Feature(int64_list=tf.train.Int64List(value=[int64])) # 整型 tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes])) # 字符串 tf.train.Feature(float_list=tf.train.FloatList(value=[float])) # 实数列表 # 创建写入类,将序列化数据写入到TFRecord中 writer = tf.python_io.TFTrcordWriter("data.tfrecords") # 将文件转化为tfRecord格式 with open(filename) as f: for line in f: l = line.strip().split('\t') idd = list(map(lambda x: int(x.strip()), l[0].strip().split('-'))) fea = list(map(lambda x: x.strip(), l[1].strip().split(','))) # 将数据规则化,tf.train.Example包含key-数据结构(类似dict),初始化必须传入tf.train.Features对象 example = tf.train.Example( features = tf.train.Features( feature = { "ids":tf.train.Feature(int64_list=tf.train.Int64List(idd)) "feature":tf.train.Feature(bytes_list=tf.train.BytesList(fea)) } ) ) # 将Example序列化成字符串,并写入到TFRecord中 writer.write(example.SerializeToString()) writer.close() # 读取tfRecord文件 ## 创建tfRecordDataset dataset = tf.data.TFRecordDataset("data.tfrecords") print dataset.output_types print dataset.output_shapes ## 因为TFRecordDataset直接读进来的数据是字符串格式,需要解析数据按格式 ### tf.FixedLenFeature将输入的数据转化成需要的格式 def parse_string(example_proto): features = { "ids":tf.FixedLenFeature([3], tf.int64) "feature":tf.VarLenFeature(tf.string) } # 将输入数据转化为指定结构 parsed_feature = tf.parse_single_example(example_proto, features) return parsed_feature["ids"], parsed_feature["feature"] dataset = dataset.map(parse_string) print dataset.output_types print dataset.output_shapes
特征解析方式有两种
tf.FixedLenFeature(shape, dtype, default_value) -- 定长特征解析,返回的为稀疏表示
tf.VarLenFeature(dtype) -- 不定长的特征解析,返回为密集表示
直接从文本数据读入
- 实质:直接从文本文件读入数据,进来的每个元素为一个tf.string的一个tensor,然后解析成指定的结构
- 注意:目前tensorflow的字符串的处理函数直接处理结构比较复杂,这里采用函数处理
- 测试代码
# 构建string处理函数 def parse_line(line): l = line.strip().split('\t') idd = list(map(lambda x: int(x.strip()), l[0].strip().split('-'))) feature = list(map(lambda x: x.strip(), l[1].strip().split('-'))) return idd, feature def parse_string(s): return tf.py_func(parse_line, [s], [tf.int64, tf.string]) # 构建TFRecord读取器 dataset = tf.data.TextLineDataset([filename]) print dataset.output_types print dataset.output_shapes dataset = dataset.map(parse_strnig) print dataset.output_types print dataset.output_shapes
tf.py_func:
- 功能:包装一个python函数,在计算图中计算节点中使用
构建迭代器读取dataset的数据
- 说明:
- 存入dataset的数据并不能直接处理,需要实例化迭代器取出其中元素,进行处理
- 当dataset的元素通过get_next()函数取完时,会抛出异常tf.errors.OutOfRangeError
- 迭代器返回的是tensor,在sess.run后得到的数据为nupmy下的数据类型
迭代器:
- one-shot
iterator = dataset.make_one_shot_iterator() idd, feature = iterator.get_next() while True:˙ try: print sess.run(idd,feature) except tf.errors.OutOfRangeError: print "end dataset" break
- initializable
iterator = dataset.make_initializable_iterator()
- reintializable
- feedable
- 说明:
完整的读取tfrecord的代码
input_path = ['./test/train_predict.tfrecord']
params = dict(zip(['num_epochs', 'capacity', 'min_after_dequeue', 'batch_size'],[5, 500, 100, 5]))
########################################################################################
# 按照TFRecordDataset读取数据
########################################################################################
dataset = tf.data.TFRecordDataset(input_path)
# 1. 解析数据
def deal_data(serial_exmp):
feats = tf.parse_example([serial_exmp],
features = {
'index': tf.FixedLenFeature([], tf.int64),
'label': tf.FixedLenFeature([], tf.int64),
'features' : tf.VarLenFeature(tf.int64),
'values' : tf.VarLenFeature(tf.int64)
})
index = feats['index']
label = feats['label']
feature = feats['features']
value = feats['values']
return index, label, feature, value
features = dataset.map(deal_data)
# 2. 分发多个epoch处理
features = features.repeat(params['num_epochs']).shuffle(params['batch_size']).batch(params['batch_size'])
# 3. 创建iterator提取数据
iter_features = features.make_one_shot_iterator()
# 4. string_handle()返回一个tensor
with tf.Session() as sess:
sess.run(iter_features.get_next())
读取hdfs文件
参考: